logo头像
Snippet 博客主题

HBase学习之路 (五)MapReduce操作Hbase

** HBase学习之路 (五)MapReduce操作Hbase:** <Excerpt in index | 首页摘要>

​ HBase学习之路 (五)MapReduce操作Hbase

<The rest of contents | 余下全文>

MapReduce从HDFS读取数据存储到HBase中

现有HDFS中有一个student.txt文件,格式如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
95002,刘晨,女,19,IS
95017,王风娟,女,18,IS
95018,王一,女,19,IS
95013,冯伟,男,21,CS
95014,王小丽,女,19,CS
95019,邢小丽,女,19,IS
95020,赵钱,男,21,IS
95003,王敏,女,22,MA
95004,张立,男,19,IS
95012,孙花,女,20,CS
95010,孔小涛,男,19,CS
95005,刘刚,男,18,MA
95006,孙庆,男,23,CS
95007,易思玲,女,19,MA
95008,李娜,女,18,CS
95021,周二,男,17,MA
95022,郑明,男,20,MA
95001,李勇,男,20,CS
95011,包小柏,男,18,MA
95009,梦圆圆,女,18,MA
95015,王君,男,18,MA

将HDFS上的这个文件里面的数据写入到HBase数据块中

MapReduce实现代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReadHDFSDataToHbaseMR extends Configured implements Tool{

public static void main(String[] args) throws Exception {

int run = ToolRunner.run(new ReadHDFSDataToHbaseMR(), args);
System.exit(run);
}

@Override
public int run(String[] arg0) throws Exception {

Configuration conf = HBaseConfiguration.create();
conf.set("fs.defaultFS", "hdfs://myha01/");
conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
System.setProperty("HADOOP_USER_NAME", "hadoop");
FileSystem fs = FileSystem.get(conf);
// conf.addResource("config/core-site.xml");
// conf.addResource("config/hdfs-site.xml");

Job job = Job.getInstance(conf);

job.setJarByClass(ReadHDFSDataToHbaseMR.class);

job.setMapperClass(HDFSToHbaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

TableMapReduceUtil.initTableReducerJob("student", HDFSToHbaseReducer.class, job,null,null,null,null,false);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);

Path inputPath = new Path("/student/input/");
Path outputPath = new Path("/student/output/");

if(fs.exists(outputPath)) {
fs.delete(outputPath,true);
}

FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);

boolean isDone = job.waitForCompletion(true);

return isDone ? 0 : 1;
}


public static class HDFSToHbaseMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}

}

/**
* 95015,王君,男,18,MA
* */
public static class HDFSToHbaseReducer extends TableReducer<Text, NullWritable, NullWritable>{

@Override
protected void reduce(Text key, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException {

String[] split = key.toString().split(",");

Put put = new Put(split[0].getBytes());

put.addColumn("info".getBytes(), "name".getBytes(), split[1].getBytes());
put.addColumn("info".getBytes(), "sex".getBytes(), split[2].getBytes());
put.addColumn("info".getBytes(), "age".getBytes(), split[3].getBytes());
put.addColumn("info".getBytes(), "department".getBytes(), split[4].getBytes());

context.write(NullWritable.get(), put);

}

}

}

MapReduce从HBase读取数据计算平均年龄并存储到HDFS中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;



public class ReadHbaseDataToHDFS extends Configured implements Tool{

public static void main(String[] args) throws Exception {

int run = ToolRunner.run(new ReadHbaseDataToHDFS(), args);
System.exit(run);

}

@Override
public int run(String[] arg0) throws Exception {

Configuration conf = HBaseConfiguration.create();
conf.set("fs.defaultFS", "hdfs://myha01/");
conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
System.setProperty("HADOOP_USER_NAME", "hadoop");
FileSystem fs = FileSystem.get(conf);
// conf.addResource("config/core-site.xml");
// conf.addResource("config/hdfs-site.xml");

Job job = Job.getInstance(conf);

job.setJarByClass(ReadHbaseDataToHDFS.class);


// 取对业务有用的数据 info,age
Scan scan = new Scan();
scan.addColumn("info".getBytes(), "age".getBytes());

TableMapReduceUtil.initTableMapperJob(
"student".getBytes(), // 指定表名
scan, // 指定扫描数据的条件
HbaseToHDFSMapper.class, // 指定mapper class
Text.class, // outputKeyClass mapper阶段的输出的key的类型
IntWritable.class, // outputValueClass mapper阶段的输出的value的类型
job, // job对象
false
);


job.setReducerClass(HbaseToHDFSReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);

Path outputPath = new Path("/student/avg/");

if(fs.exists(outputPath)) {
fs.delete(outputPath,true);
}

FileOutputFormat.setOutputPath(job, outputPath);

boolean isDone = job.waitForCompletion(true);

return isDone ? 0 : 1;
}

public static class HbaseToHDFSMapper extends TableMapper<Text, IntWritable>{

Text outKey = new Text("age");
IntWritable outValue = new IntWritable();
// key是hbase中的行键
// value是hbase中的所行键的所有数据
@Override
protected void map(ImmutableBytesWritable key, Result value,Context context)
throws IOException, InterruptedException {

boolean isContainsColumn = value.containsColumn("info".getBytes(), "age".getBytes());

if(isContainsColumn) {

List<Cell> listCells = value.getColumnCells("info".getBytes(), "age".getBytes());
System.out.println("listCells:\t"+listCells);
Cell cell = listCells.get(0);
System.out.println("cells:\t"+cell);

byte[] cloneValue = CellUtil.cloneValue(cell);
String ageValue = Bytes.toString(cloneValue);
outValue.set(Integer.parseInt(ageValue));

context.write(outKey,outValue);

}

}

}

public static class HbaseToHDFSReducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{

DoubleWritable outValue = new DoubleWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {

int count = 0;
int sum = 0;
for(IntWritable value : values) {
count++;
sum += value.get();
}

double avgAge = sum * 1.0 / count;
outValue.set(avgAge);
context.write(key, outValue);
}

}

}